অ্যাপাচি ফ্লিঙ্ক (Apache Flink)

Fault Tolerance কনফিগারেশন এবং উদাহরণ

Latest Technologies - অ্যাপাচি ফ্লিঙ্ক (Apache Flink) - Flink এবং Fault Tolerance | NCTB BOOK

Apache Flink-এ Fault Tolerance কনফিগারেশন হল Flink অ্যাপ্লিকেশনগুলিকে ক্র্যাশ বা ত্রুটি থেকে পুনরুদ্ধার করতে সক্ষম করা। Flink স্ট্রিম প্রসেসিং প্ল্যাটফর্মে বিল্ট-ইন ফল্ট টলারেন্স সমর্থন রয়েছে যা অ্যাপ্লিকেশনের স্থায়ীত্ব এবং নির্ভরযোগ্যতা নিশ্চিত করে। Flink মূলত Checkpoint এবং Savepoint মেকানিজম ব্যবহার করে ফল্ট টলারেন্স নিশ্চিত করে।

Fault Tolerance কনফিগারেশন

Checkpointing কনফিগারেশন:

  • Flink Checkpointing-এর মাধ্যমে নির্দিষ্ট সময় পরপর ডেটার একটি স্ন্যাপশট তৈরি করে। Checkpoint ব্যবহারের জন্য, এটি কনফিগার করা আবশ্যক:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Checkpointing সক্রিয় করা
env.enableCheckpointing(5000); // 5000 মিলিসেকেন্ড (৫ সেকেন্ড) অন্তর Checkpoint নেবে

Checkpoint Storage কনফিগারেশন:

  • Checkpoint কোথায় সংরক্ষণ করা হবে সেটি কনফিগার করা যেতে পারে:
  • এখানে HDFS ব্যবহৃত হয়েছে, তবে S3 বা লোকাল ফাইল সিস্টেমও ব্যবহার করা যেতে পারে।
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink-checkpoints");

State Backend কনফিগারেশন:

  • Flink-এর State Backend নির্ধারণ করে কিভাবে এবং কোথায় state সংরক্ষিত হবে:
  • এখানে HashMapStateBackend ব্যবহার করা হয়েছে। এছাড়াও, RocksDBStateBackend ব্যবহার করে উন্নত পারফরম্যান্স পাওয়া যায়।
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/flink-checkpoints"));

Savepoint কনফিগারেশন:

  • Savepoint একটি ম্যানুয়াল স্ন্যাপশট যা অ্যাপ্লিকেশনের বর্তমান অবস্থান ধরে রাখে। Savepoint সাধারণত ম্যানুয়ালি ট্রিগার করা হয়:
  • এটি সাধারণত অ্যাপ্লিকেশন আপগ্রেড বা ম্যান্টেনেন্সের সময় ব্যবহার করা হয়।
./bin/flink savepoint <jobId> <savepointDirectory>

উদাহরণ

নিচের উদাহরণটি একটি Checkpoint-enabled Flink অ্যাপ্লিকেশন:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FaultTolerantJob {
    public static void main(String[] args) throws Exception {
        // Execution Environment তৈরি করা
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Checkpointing সক্রিয় করা
        env.enableCheckpointing(10000); // ১০ সেকেন্ড অন্তর Checkpoint
        
        // State Backend সেট করা
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///tmp/flink-checkpoints"));
        
        // একটি ডাটা সোর্স সেট করা (উদাহরণস্বরূপ)
        env.fromElements(1, 2, 3, 4, 5)
           .map(value -> value * 2)
           .print();

        // কাজটি শুরু করা
        env.execute("Fault Tolerant Flink Job");
    }
}

কনফিগারেশন পরামর্শ

  • Checkpoint Interval: খুব বেশি ছোট বা বড় ইন্টারভাল দিলে পারফরম্যান্সে প্রভাব পড়তে পারে। সাধারণত, ৫-১৫ সেকেন্ডের মধ্যে রাখা উচিত।
  • Checkpoint Timeout: Checkpoint টাইমআউট কনফিগার করা যেতে পারে যদি Checkpoint সম্পূর্ণ হতে বেশি সময় লাগে:
env.getCheckpointConfig().setCheckpointTimeout(60000); // ৬০ সেকেন্ড টাইমআউট

এইভাবে, Flink-এর ফল্ট টলারেন্স মেকানিজম কনফিগার এবং ব্যবহার করে, আপনি একটি স্থিতিশীল এবং নির্ভরযোগ্য স্ট্রিম প্রসেসিং অ্যাপ্লিকেশন তৈরি করতে পারেন।

Promotion